package org.example.flink.connections.pulsar;

import cn.hutool.core.date.DateUtil;
import org.apache.pulsar.client.api.*;

public class TestConsumer {

    public static void main(String[] args) throws Exception{

        PulsarClient client = PulsarClient.builder()
//                .enableTransaction(true)
                .serviceUrl("http://127.0.0.1:8080").build();

        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .subscriptionName("t21")
                .subscriptionType(SubscriptionType.Shared)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscriptionMode(SubscriptionMode.Durable)
                .topic("persistent://public/default/delay10")
//                .enableRetry(true)
                .messageListener( ((consumer1, msg) -> {
                    System.out.println("Msg:"+new String(msg.getValue())+" now="+ DateUtil.date());
                    try {
                        consumer1.acknowledge(msg);
                    } catch (PulsarClientException e) {
                        e.printStackTrace();
                    }
                })).subscribe();



    }

}
